
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import static org.apache.flink.table.api.Expressions.*;


public class  OverWindowAggregation
{
    public static void main(String[] args) throws Exception
    {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        env.getConfig().setAutoWatermarkInterval(1L);


        DataStream<OrderStream> orderA = env.fromCollection(Arrays.asList(
                new OrderStream(1,1L, "beer",   3, 1505529000000L),  //2017-09-16 10:30:00
                new OrderStream(2,1L, "beer",   3, 1505529000000L),  //2017-09-16 10:30:00
                new OrderStream(3,3L, "rubber", 2, 1505527800000L),//2017-09-16 10:10:00
                new OrderStream(4,3L, "rubber", 2, 1505527800000L),//2017-09-16 10:10:00
                new OrderStream(5,1L, "diaper", 4, 1505528400000L),//2017-09-16 10:20:00
                new OrderStream(6,1L, "diaper", 4, 1505528400000L) //2017-09-16 10:20:00
        ));

        Table orders=tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<OrderStream>()
        {

            Long currentMaxTimestamp = 0L;
            final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s

            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

            //            @Nullable
            @Override
            public org.apache.flink.streaming.api.watermark.Watermark getCurrentWatermark()
            {

                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }


            @Override
            public long extractTimestamp(OrderStream element, long recordTimestamp)
            {
                long timestamp = element.rowtime;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);

                System.out.println("id:"+element.id+","+"user:"+element.user+"," +
                        "eventtime:["+element.rowtime+"|"+sdf.format(element.rowtime)+"" +
                        "],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                        sdf.format(currentMaxTimestamp)+"],watermark:["+
                        getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
                return timestamp;
            }
        }),$("user"), $("product"), $("amount"), $("rowtime").rowtime());






        Table result1 = orders
                // define window
                .window(
                        Over.partitionBy($("user"))
                                .orderBy($("rowtime"))
                                .preceding(UNBOUNDED_RANGE)
                                .following(CURRENT_RANGE)
                                .as("w"))
                // sliding aggregate
                .select(
                        $("*")
                );



        Table result2 = orders
                // define window
                .window(
                        Over.partitionBy($("user"))
                                .orderBy($("rowtime"))
                                .preceding(UNBOUNDED_RANGE)
                                .following(CURRENT_RANGE)
                                .as("w"))
                // sliding aggregate
                .select(
                        $("user"),
                        $("amount").avg().over($("w")),
                        $("amount").max().over($("w")),
                        $("amount").min().over($("w"))
                );
//        根据Product,user来进行归类统计
//        这里的start(),end()意思是归类后的时间特征(开始，截止)




        tEnv.toRetractStream(result1, Row.class).print();
//        tEnv.toRetractStream(result2, Row.class).print();


        env.execute();
    }

}




